package com.atguigu.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
import com.atguigu.gmall.realtime.bean.TradeProvinceOrderBean;
import com.atguigu.gmall.realtime.util.DateFormatUtil;
import com.atguigu.gmall.realtime.util.MyClickhouseUtil;
import com.atguigu.gmall.realtime.util.MyKafkaUtil;
import com.atguigu.gmall.realtime.util.TimestampLtz3CompareUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

/**
 * @author Felix
 * @date 2022/10/11
 * 交易域：按照省份维度对下单业务数据进行聚合统计
 * 需要启动的进程
 *      zk、kafka、maxwell、hdfs、hbase、redis、clickhouse
 *      DwdTradeOrderPreProcess、DwdTradeOrderDetail、DwsTradeProvinceOrderWindow
 */
public class DwsTradeProvinceOrderWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        /*
        //TODO 2.检查点相关设置
        //2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.AT_LEAST_ONCE);
        //2.2 设置检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(6000L);
        //2.3 设置job取消之后 检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 设置两个检查点之间最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        //2.5 设置重启策略
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(30),Time.seconds(3)));
        //2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://xxxxx");
        //2.7 设置操作hadoop的用户
        System.setProperty("HADOOP_USER_NAME","atguigu");
        */
        //TODO 3.从kafka主题中读取下单数据
        //3.1 声明消费的主题以及消费者组
        String topic = "dwd_trade_order_detail";
        String groupId = "dws_trade_province_order_group";
        //3.2 创建消费者对象
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
        //3.3 消费数据 封装为流
        DataStreamSource<String> kafkaStrDS = env.addSource(kafkaConsumer);
        // {"id":"2081","order_id":"882","user_id":"389","sku_id":"10","sku_name":"Apple iPhone 1","province_id":"27","activity_id":"2",
        // "activity_rule_id":"4","coupon_id":null,"date_id":"2022-10-08","create_time":"2022-10-08 16:36:18","source_id":null,
        // "source_type_code":"2401","source_type_name":"用户查询","sku_num":"3","split_original_amount":"24591.0000",
        // "split_activity_amount":"565.51","split_coupon_amount":null,"split_total_amount":"24025.49","ts":"1665477378",
        // "row_op_ts":"2022-10-11 08:36:19.846Z"}
        // kafkaStrDS.print(">>>>");
        //TODO 4.对读取数据进行类型转换    jsonStr->jsonObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaStrDS.map(JSON::parseObject);
        //TODO 5.按照订单明细id进行分组
        KeyedStream<JSONObject, String> orderDetailIdKeyedDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getString("id"));
        //TODO 6.使用Flink的状态 + 定时器去重
        SingleOutputStreamOperator<JSONObject> distinctDS = orderDetailIdKeyedDS.process(
            new KeyedProcessFunction<String, JSONObject, JSONObject>() {
                private ValueState<JSONObject> lastJsonObjState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    lastJsonObjState = getRuntimeContext().getState(new ValueStateDescriptor<JSONObject>("lastJsonObjState", JSONObject.class));
                }

                @Override
                public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
                    JSONObject lastJsonObj = lastJsonObjState.value();
                    if (lastJsonObj == null) {
                        lastJsonObjState.update(jsonObj);
                        long currentProcessingTime = ctx.timerService().currentProcessingTime();
                        ctx.timerService().registerProcessingTimeTimer(currentProcessingTime + 5000L);
                    } else {
                        String lastRowOpTs = lastJsonObj.getString("row_op_ts");
                        String curRowOpTs = jsonObj.getString("row_op_ts");
                        if (TimestampLtz3CompareUtil.compare(lastRowOpTs, curRowOpTs) <= 0) {
                            lastJsonObjState.update(jsonObj);
                        }
                    }
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<JSONObject> out) throws Exception {
                    JSONObject jsonObj = lastJsonObjState.value();
                    if (jsonObj != null) {
                        out.collect(jsonObj);
                    }
                    lastJsonObjState.clear();
                }
            }
        );
        //TODO 7.将流中数据类型进行转换    jsonObj - >实体类对象
        SingleOutputStreamOperator<TradeProvinceOrderBean> orderBeanDS = distinctDS.map(
            new MapFunction<JSONObject, TradeProvinceOrderBean>() {
                @Override
                public TradeProvinceOrderBean map(JSONObject jsonObj) throws Exception {
                    String provinceId = jsonObj.getString("province_id");
                    String orderId = jsonObj.getString("order_id");
                    Double orderAmount = jsonObj.getDouble("split_total_amount");
                    Long ts = jsonObj.getLong("ts") * 1000L;

                    TradeProvinceOrderBean tradeProvinceOrderWindow = TradeProvinceOrderBean.builder()
                        .provinceId(provinceId)
                        .orderIdSet(new HashSet<String>(
                            Collections.singleton(orderId)
                        ))
                        .orderAmount(orderAmount)
                        .ts(ts)
                        .build();
                    return tradeProvinceOrderWindow;

                }
            }
        );
        //TODO 8.指定Watermark以及提取事件时间字段
        SingleOutputStreamOperator<TradeProvinceOrderBean> withWatermarkDS = orderBeanDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<TradeProvinceOrderBean>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<TradeProvinceOrderBean>() {
                        @Override
                        public long extractTimestamp(TradeProvinceOrderBean orderBean, long recordTimestamp) {
                            return orderBean.getTs();
                        }
                    }
                )
        );
        //TODO 9.按照省份维度进行分组
        KeyedStream<TradeProvinceOrderBean, String> provinceIdKeyedDS = withWatermarkDS.keyBy(TradeProvinceOrderBean::getProvinceId);
        //TODO 10.开窗
        WindowedStream<TradeProvinceOrderBean, String, TimeWindow> windowDS = provinceIdKeyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        //TODO 11.聚合
        SingleOutputStreamOperator<TradeProvinceOrderBean> reduceDS = windowDS.reduce(
            new ReduceFunction<TradeProvinceOrderBean>() {
                @Override
                public TradeProvinceOrderBean reduce(TradeProvinceOrderBean value1, TradeProvinceOrderBean value2) throws Exception {
                    value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                    value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
                    return value1;
                }
            },
            new WindowFunction<TradeProvinceOrderBean, TradeProvinceOrderBean, String, TimeWindow>() {
                @Override
                public void apply(String s, TimeWindow window, Iterable<TradeProvinceOrderBean> input, Collector<TradeProvinceOrderBean> out) throws Exception {
                    for (TradeProvinceOrderBean orderBean : input) {
                        orderBean.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        orderBean.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        orderBean.setTs(System.currentTimeMillis());
                        orderBean.setOrderCount((long) orderBean.getOrderIdSet().size());
                        out.collect(orderBean);
                    }
                }
            }
        );
        //TODO 12.和省份维度进行关联
        SingleOutputStreamOperator<TradeProvinceOrderBean> withProvinceDS = AsyncDataStream.unorderedWait(
            reduceDS,
            new DimAsyncFunction<TradeProvinceOrderBean>("dim_base_province") {
                @Override
                public void join(TradeProvinceOrderBean orderBean, JSONObject dimInfoJsonObj) {
                    orderBean.setProvinceName(dimInfoJsonObj.getString("NAME"));
                }

                @Override
                public String getKey(TradeProvinceOrderBean orderBean) {
                    return orderBean.getProvinceId();
                }
            }, 60, TimeUnit.SECONDS
        );
        //TODO 13.将关联的结果写到ck中
        withProvinceDS.print(">>>>>");
        withProvinceDS.addSink(MyClickhouseUtil.getSinkFunction("insert into dws_trade_province_order_window values(?,?,?,?,?,?,?)"));
        env.execute();
    }
}
